并发

Author Avatar
子语 2018 - 01 - 17
  • 在其它设备中阅读本文章

通常程序都是顺序执行且代码只是为了完成独立的一个任务,因为这样的代码易写易维护。但某些情况下,并行执行多个任务更有好处,比如Web服务需要在各自的套接字(socket)上同时接收多少数据请求。每个套接字请求都是独立的,可以独立其他套接字进行处理,此时并行执行多个请求可以显著提高服务。Go语言的语法和运行时内置了对并发的支持。

Go中的并发指的是能让某个函数独立于其他函数运行。当一个函数创建为goroutine时,Go会将其视为一个独立的工作单元,这个单元会被调度到可用的逻辑处理器上运行。Go语言运行时的调度器是一个复杂的软件,能管理被创建的所有 goroutine并为其分配执行时间。这个调度器在操作系统之上,将操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行goroutine。调度器在任何给定的时间,都会全面控制某个goroutine要在某个逻辑处理器上运行。

Go语言的并发同步模型来自一个叫做通信顺序进程Communicating Sequential Processes, CSP的范型paradigm。CSP是一种消息传递模型,通过在goroutine间传递数据来传递消息,而不是对数据进行加锁从而实现同步。用于在goroutime之间同步和传递数据的关键数据类型叫作通道channel

并发与并行

什么是操作系统的线程(thread)和进程(process)。当运行一个应用程序时,操作系统会为其启动一个进程。可以将这个进程看作一个包含了应用程序在运行中需要用到和维护的各种资源的容器。

进程中的资源包括但不限于内存地址空间、文件和设备的句柄以及线程。一个线程是一个执行空间,这个空间会被操作系统调度用来运行函数中所写的代码。每个进程至少包含一个线程,每个进程的初始线程被称为主线程。因为执行这个线程的空间是应用程本身的空间,所以当主线程终止时,应用程序也会终止。操作系统会将线程调度到某个处理器上运行,但这个处理器不一定是进程所在的处理器。

操作系统会在物理处理器上调度线程来运行,而Go的运行时会在逻辑处理器上调度goroutine来运行。Go1.5之前,默认给整个应用程序只分配一个逻辑处理器;1.5之后运行时会默认为每个可用的物理处理器分配一个逻辑处理器。这些逻辑处理器会用于执行所有被创建的goroutime,即便只有一个逻辑处理器,Go依然可以并发调度无数个goroutine

如果创建一个goroutine并准备运行,这个goroutine就会被放到调度器的全局队列中,之后调度器就会将队列中的goroutine分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中,本地运行队列中的goroutine会一直等待直到自己被分配逻辑处理器执行。

有时,正在运行的goroutine需要执行一个阻塞的系统调用,如打开一个文件。当这类调用发生时,线程和goroutine会从逻辑处理器上分离,该线程会继续阻塞,等待系统调用的返回。同时,这个逻辑处理器就失去了用来运行的线程。所以调度器会创建一个新线程,并将其绑定到该逻辑处理器上。之后,调度器会从本地运行队列中选择另一个goroutine来运行。一旦被阻塞的系统调用执行完成并返回,对应的goroutine会放回到本地运行队列,而之前的线程会保存好,以便后面继续使用。

调度器对可以创建的逻辑处理器的数量没有限制,但语言运行时默认限制每个程序最多创建10000个线程,超过这个限制,就会崩溃,可以通过调用runtime/debug包中的SetMaxThreads来更改。

并发concurrency不是并行parallelism。并行是让不同的代码片段同时在不同的物理处理器上执行,其关键是同时做很多事情。并发是指同时管理很多事情。大部分情况下,并发的效果比并行好,因为操作系统和硬件的总资源一般很少,但能支持系统同时做很多事。

goroutine

下述程序会创建两个goroutine,以并发的形式分别显示大写和小写字母:

package main

import (
   "runtime"
   "sync"
   "fmt"
)

func main() {
   // 分配一个逻辑处理器给调度器用
   runtime.GOMAXPROCS(1)

   // wg用于等待程序完成
   var wg sync.WaitGroup
   // 计数+2.表示要等待两个goroutine
   wg.Add(2)

   fmt.Println("Start Goroutines")

   // 声明一个匿名函数,并创建一个goroutine
   go func() {
      defer wg.Done()

      for  count := 0; count < 3; count++ {
         for char := 'a'; char < 'a' + 26 ; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()

   // 声明一个匿名函数,并创建一个goroutine
   go func() {
      defer wg.Done()

      for  count := 0; count < 3; count++ {
         for char := 'A'; char < 'A' + 26 ; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()

   fmt.Println("Waiting To Finish")
   // 等待goroutine
   wg.Wait()

   fmt.Printf("\n Termingating Program")
}

runtime包的GOMAXPROCES函数用于更改调度器可以使用的逻辑处理器数量。声明了两个匿名函数,分别通过关键字go创建goroutine来执行。

WaitGroup是一个计数信号量,用来记录并维护运行的goroutine,如果WaitGroup大于0,则Wait方法会阻塞。上述代码中将WaitGroup设为2,表示有2个正在运行的goroutine,为了减小WaitGroup的值并最终释放main函数,在两个匿名函数中,使用defer定声明了函数退出时调用Done方法。defer会修改函数调用时机,在正在执行的函数返回时,才会调用defer声明的函数。

Start Goroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z 
 Termingating Program

第一个goroutine完成显示花费的时间太短,以至于在调度器切换到第二个goroutine前就完成了所有任务,这就是为什么先输出了所有的大写字母,再输出了所有小写字母的原因。

基于调度器的内部算法,一个正在运行的goroutine在工作结束前,可以被停止并被重新调度。这样做的目的是防止某个goroutine长时间占用逻辑处理器。可通过下述例子说明:下述程序创建了一个需要长时间才能完成其工作 的goroutine

package main

import (
   "sync"
   "runtime"
   "fmt"
)

var wg sync.WaitGroup

func main() {
   // 分配一个逻辑处理器给调度器使用
   runtime.GOMAXPROCS(1)

   // 计数+2,表示要等待2个goroutine
   wg.Add(2)

   fmt.Println("Create Goroutines")
   // 创建两个goroutine
   go printPrime("A")
   go printPrime("B")

   fmt.Println("Waitting To Finish")
   // 等待goroutine结束
   wg.Wait()

   fmt.Println("Termination Program")

}

// 显示5000以内的素数
func printPrime(prefix string) {
   // 函数退出时,调用Done来通知main函数工作完成
   defer wg.Done()

next:
   for outer := 2; outer < 5000; outer++ {
      for inner := 2; inner < outer; inner++ {
         if outer % inner == 0 {
            continue next
         }
      }
      fmt.Printf("%s:%d\n", prefix, outer)
   }
   fmt.Println("Completed", prefix)
}

运行程序,会看到素数的前缀最开始为B,当到了一定数量后,前缀切换为A。之后前缀A运行一段时间后,又会切换到B去。每次运行程序,调度器切换的时间点可能会不同。

上述程序中都有用到runtime包中的GOMAXPROCS函数来设置调度器使用的逻辑处理器数量。用这个函数,可以给每个可用的物理处理器在运行时分配一个逻辑处理器,才能使得goroutine并行运行。

runtime.GOMAXPROCS(runtime.NumCPU())NumCPU()会返回可以使用的物理处理器的数量,因此这行代码就相当于给每个可用的物理处理器创建一个逻辑处理器。使用多个逻辑处理器并不意味着性能更优。修改用于显示字母表的代码,将其中的

    fmt.Println(runtime.NumCPU())
	runtime.GOMAXPROCS(runtime.NumCPU())

此时可以看到输出结果:

4 // 物理主机为4核,即有4个物理处理器
Start Goroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R a b c d e f g h i j k S l T U V W X Y Z A B C D m n o p q r s t u v w x y z a b E F G H I J K L M N O c d e f g h i j k l m P Q R S T U V W X Y Z A B C D E F G H I J K L M N O n o p P Q R S q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z T U V W X Y Z 
 Termingating Program

由于此时的goroutine是并行运行的,所以大小写字母会混合在一起显示。

竞争状态

如果多个goroutine在没有互相同步的情况下,访问某个共享资源,并视图读和写这个资源时,就会相互竞争的状态,这种情况被称为竞争状态race candition。竞争状态的存在让并发程序变得复杂,而且容易引起问题。对于共享资源的读与写 必须是原子化的,即同一时刻只能有一个goroutine对共享资源进行读和写操作。

package main

import (
   "sync"
   "runtime"
   "fmt"
)

var (
   // 共享资源
   counter int
   // wg用于等待程序结束
   wg sync.WaitGroup
)

func main() {
   wg.Add(2)

   go incCounter(1)
   go incCounter(2)

   wg.Wait()
   fmt.Println("Final Counter:", counter)
}

func incCounter(id int)  {
   defer wg.Done()

   for count := 0; count < 2; count++ {
      // 捕获counter的值
      value := counter
      // 当前goroutine从线程退出,并放回到队列
      runtime.Gosched()
      // 本地value自增
      value ++
      // 将值保存回counter
      counter = value
     
      // 输出当前值
      fmt.Printf("%d - %d \n", id, counter)
   }
}

上述程序中,共享变量counter会进行4次读和写,每个goroutine会执行2次,但程序结束时, counter的值却为2。程序执行过程如下图:

无法加载

上述程序创建了两个goroutine, incCounter函数对变量counter进行读和写操作,而该变量是共享资源。每个goroutine都会预先读出该变量的值,并将其副本存入到一个叫value的本地变量中,随后incCounter函数会对value的副本的值加1,最后将这个新值存到counter变量中。

runtime.Gosched()会将当前goroutine从线程中退出,这样给了其他goroutine运行的机会。在本程序中,这样做的目的是强制调度器切换goroutine,使得竞争状态的效果更明显。

Go中有检测代码竞争状态的工具,执行go build -race,然后运行创建后的程序,可以看到:

==================
WARNING: DATA RACE
Write at 0x0000005e09b0 by goroutine 7:
  main.incCounter()
      E:/GoProjects/src/demo/main.go:37 +0xb5

Previous read at 0x0000005e09b0 by goroutine 6:
  main.incCounter()
      E:/GoProjects/src/demo/main.go:31 +0x91

Goroutine 7 (running) created at:
  main.main()
      E:/GoProjects/src/demo/main.go:20 +0x90

Goroutine 6 (running) created at:
  main.main()
      E:/GoProjects/src/demo/main.go:19 +0x6f
==================
2 - 1
1 - 1
2 - 2
1 - 2
Final Counter: 2
Found 1 data race(s)

由上述竞争检测结果可知,竞争发生在counter = value value: counter go incCounter(1) go incCounter(2)这四行代码中。这几行分别是对counter的读和写操作。

要想消除竞争状态,可以使用Go语言提供的锁机制,来所住共享资源,从而保证goroutine的同步状态。

锁住共享资源

Go提供了传统的同步goroutine的机制,就是对共享资源加锁。如果需要顺序访问一个整型变量或一段代码,atomicsync包中的函数提供了很好的解决方案。

原子函数

原子函数能够以很底层的加锁机制来同步在访问整型变量和指针。

package main

import (
   "sync"
   "runtime"
   "fmt"
   "sync/atomic"
)

var (
   // 共享资源
   counter int64
   // wg用于等待程序结束
   wg sync.WaitGroup
)

func main() {
   wg.Add(2)

   go incCounter(1)
   go incCounter(2)

   wg.Wait()
   fmt.Println("Final Counter:", counter)
}

func incCounter(id int)  {
   defer wg.Done()

   for count := 0; count < 2; count++ {
      // 安全地对counter+1
      atomic.AddInt64(&counter, 1)
      runtime.Gosched()
   }
}

上述程序使用atomicAddInt64函数,这个函数会同步整型值的加法,通过强制同一时刻只能有一个goroutine运行并完成加法操作。当goroutine试图去调用任何原子函数时,这些goroutine都会自动根据所引用的变量做同步处理。

此外 还有两个函数LoadInt64StoreInt64,这两个原子函数提供安全地读和写一个整型值的方式。下述程序将使用这两个函数创建一个同步标志,该标志可以向程序里多个 goroutine通知某种特殊状态。

This blog is under a CC BY-NC-SA 3.0 Unported License
本文链接:http://yov.oschina.io/article/Go/Go Base/并发/